-
Notifications
You must be signed in to change notification settings - Fork 825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Added busy counter for sockets and various fixes for data race problems #2893
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2893 +/- ##
==========================================
- Coverage 64.88% 64.67% -0.21%
==========================================
Files 101 101
Lines 17634 17565 -69
==========================================
- Hits 11441 11360 -81
- Misses 6193 6205 +12 ☔ View full report in Codecov by Sentry. |
const bool drift_updated = | ||
#endif | ||
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); | ||
//leaveCS(m_RcvBufferLock); |
Check notice
Code scanning / CodeQL
Commented-out code Note
@@ -8723,11 +8723,14 @@ | |||
// srt_recvfile (which doesn't make any sense), you'll have a deadlock. | |||
if (m_config.bDriftTracer) | |||
{ | |||
//enterCS(m_RcvBufferLock); |
Check notice
Code scanning / CodeQL
Commented-out code Note
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also not sure why m_GlobControlLock
is locked here.
But m_RcvBufferLock
protection is required.
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
// This is only for a use together with an empty constructor. | ||
bool acquire(CUDTUnited& glob, CUDTSocket* s) | ||
{ | ||
bool caught = glob.acquireSocket(s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool caught = glob.acquireSocket(s); | |
const bool caught = glob.acquireSocket(s); |
struct ForceDestructor | ||
{ | ||
CUDTSocket* ps; | ||
ForceDestructor(): ps(NULL){} | ||
~ForceDestructor() | ||
{ | ||
if (ps) // Could be not acquired by SocketKeeper, occasionally | ||
{ | ||
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); | ||
} | ||
} | ||
} fod; | ||
#endif | ||
|
||
SocketKeeper k(*this, u, ERH_THROW); | ||
IF_HEAVY_LOGGING(fod.ps = k.socket); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please find a better name, e.g. LogOnDestruct
or ScopedLog
.
struct ForceDestructor | |
{ | |
CUDTSocket* ps; | |
ForceDestructor(): ps(NULL){} | |
~ForceDestructor() | |
{ | |
if (ps) // Could be not acquired by SocketKeeper, occasionally | |
{ | |
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); | |
} | |
} | |
} fod; | |
#endif | |
SocketKeeper k(*this, u, ERH_THROW); | |
IF_HEAVY_LOGGING(fod.ps = k.socket); | |
struct ScopedLog | |
{ | |
const CUDTSocket* const ps; | |
ScopedLog(const CUDTSocket* psock): ps(psock){} | |
~ScopedLog() | |
{ | |
if (ps) // Could be not acquired by SocketKeeper, occasionally | |
{ | |
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); | |
} | |
} | |
}; | |
#endif | |
SocketKeeper k(*this, u, ERH_THROW); | |
IF_HEAVY_LOGGING(ScopedLog slog(k.socket)); |
int ret = close(k.socket); | ||
|
||
return close(s); | ||
return ret; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return close(k.socket);
.
|
||
if (ps->isStillBusy()) | ||
{ | ||
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE."); | ||
continue; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is still a hypothetical issue here (even if the code does not do it as of now) that after you check if the socket is busy or not, you don't block possible acquisition later on. Ideally it should be acquireIfNotBusy()
uniquely blocking the socket for destruction.
@@ -8723,11 +8723,14 @@ | |||
// srt_recvfile (which doesn't make any sense), you'll have a deadlock. | |||
if (m_config.bDriftTracer) | |||
{ | |||
//enterCS(m_RcvBufferLock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also not sure why m_GlobControlLock
is locked here.
But m_RcvBufferLock
protection is required.
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
if (!need_tsbpd) | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change has been extracted to PR #2972 as not dependent on the rest changes from that PR.
Here is introduced the SocketKeeper class, similar to GroupKeeper, together with the busy counter. The goal of this solution is to keep the socket alive until the busy counter reaches 0. This allows various facilities that are still running after the socket is requested to be closed to finish their job without having the socket been physically deleted in the meantime (in the code there were many places with a high risk of this to happen).
The overall procedure is that the socket when closed is being moved from
m_Sockets
tom_ClosedSockets
and after this happens the socket is no longer dispatchable throughCUDTUnited::locateSocket
. After this happens, the cyclic call to GC may potentially delete the socket. But there can be still some activities running in other threads that might use this socket even when simultaneously moved tom_ClosedSockets
. This temporary container is intended to be used for that exactly purpose, to keep a socket that should no longer be visible until all threads finish using it. Various checks are being done inCUDT::checkBrokenSockets
to prevent deletion of sockets that are still in use by other threads, but new features in SRT have added extra situations when this may happen and it has been observed in the ThreadSanitizer that - although not fatal - an attempt to delete a socket that is simultaneously used to send a packet was detected.To prevent this, in various longer procedures that might be potentially caught in the middle of deletion of a socket there is applied a SocketKeeper. This does RAII operation on the m_iBusy counter so that it is set back to the previous value only when this procedure is finished. The loop over the
m_ClosedSockets
that selects the sockets to delete will check this and if this busy counter is not zero, deletion of a socket will miss this cycle (will have to wait for the next GC cycle to try again).A small controversy might exist about the free acquire/release functions and that's why the use of them should be exclusive to SocketKeeper so that the RAII mechanism can ensure that a temporary busy counter will always be withdrawn when no longer needed, and no such situation is permanent.